{
 "metadata": {
  "name": "",
  "signature": "sha256:6e3384a8e47856113d2d0b73cd74f62d0b7b9842ccbaa9c1eae2c2c6603fce39"
 },
 "nbformat": 3,
 "nbformat_minor": 0,
 "worksheets": [
  {
   "cells": [
    {
     "cell_type": "heading",
     "level": 1,
     "metadata": {},
     "source": [
      "Spark SQL and Data Frames"
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "[Introduction to Spark with Python, by Jose A. Dianes](http://jadianes.github.io/spark-py-notebooks)"
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "This notebook will introduce Spark capabilities to deal with data in a structured way. Basically, everything turns around the concept of *Data Frame* and using *SQL language* to query them. We will see how the data frame abstraction, very popular in other data analytics ecosystems (e.g. R and Python/Pandas), it is very powerfull when performing exploratory data analysis. In fact, it is very easy to express data queries when used together with the SQL language. Moreover, Spark distributes this column-based data structure transparently, in order to make the querying process as efficient as possible.      "
     ]
    },
    {
     "cell_type": "heading",
     "level": 2,
     "metadata": {},
     "source": [
      "Getting the data and creating the RDD"
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "As we did in previous notebooks, we will use the reduced dataset (10 percent) provided for the [KDD Cup 1999](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html), containing nearly half million nework interactions. The file is provided as a Gzip file that we will download locally.  "
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "import urllib\n",
      "f = urllib.urlretrieve (\"http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz\", \"kddcup.data_10_percent.gz\")"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 1
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "data_file = \"./kddcup.data_10_percent.gz\"\n",
      "raw_data = sc.textFile(data_file).cache()"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 2
    },
    {
     "cell_type": "heading",
     "level": 2,
     "metadata": {},
     "source": [
      "Getting a Data Frame"
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "A Spark `DataFrame` is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R or Pandas. They can be constructed from a wide array of sources such as a existing RDD in our case."
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "The entry point into all SQL functionality in Spark is the `SQLContext` class. To create a basic instance, all we need is a `SparkContext` reference. Since we are running Spark in shell mode (using pySpark) we can use the global context object `sc` for this purpose.    "
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "from pyspark.sql import SQLContext\n",
      "sqlContext = SQLContext(sc)"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 3
    },
    {
     "cell_type": "heading",
     "level": 3,
     "metadata": {},
     "source": [
      "Inferring the schema"
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "With a `SQLContext`, we are ready to create a `DataFrame` from our existing RDD. But first we need to tell Spark SQL the schema in our data.   "
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "Spark SQL can convert an RDD of `Row` objects to a `DataFrame`. Rows are constructed by passing a list of key/value pairs as *kwargs* to the `Row` class. The keys define the column names, and the types are inferred by looking at the first row. Therefore, it is important that there is no missing data in the first row of the RDD in order to properly infer the schema."
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "In our case, we first need to split the comma separated data, and then use the information in KDD's 1999 task description to obtain the [column names](http://kdd.ics.uci.edu/databases/kddcup99/kddcup.names).  "
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "from pyspark.sql import Row\n",
      "\n",
      "csv_data = raw_data.map(lambda l: l.split(\",\"))\n",
      "row_data = csv_data.map(lambda p: Row(\n",
      "    duration=int(p[0]), \n",
      "    protocol_type=p[1],\n",
      "    service=p[2],\n",
      "    flag=p[3],\n",
      "    src_bytes=int(p[4]),\n",
      "    dst_bytes=int(p[5])\n",
      "    )\n",
      ")"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 4
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "Once we have our RDD of `Row` we can infer and register the schema.  "
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "interactions_df = sqlContext.createDataFrame(row_data)\n",
      "interactions_df.registerTempTable(\"interactions\")"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 5
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "Now we can run SQL queries over our data frame that has been registered as a table.  "
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "# Select tcp network interactions with more than 1 second duration and no transfer from destination\n",
      "tcp_interactions = sqlContext.sql(\"\"\"\n",
      "    SELECT duration, dst_bytes FROM interactions WHERE protocol_type = 'tcp' AND duration > 1000 AND dst_bytes = 0\n",
      "\"\"\")\n",
      "tcp_interactions.show()"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "output_type": "stream",
       "stream": "stdout",
       "text": [
        "duration dst_bytes\n",
        "5057     0        \n",
        "5059     0        \n",
        "5051     0        \n",
        "5056     0        \n",
        "5051     0        \n",
        "5039     0        \n",
        "5062     0        \n",
        "5041     0        \n",
        "5056     0        \n",
        "5064     0        \n",
        "5043     0        \n",
        "5061     0        \n",
        "5049     0        \n",
        "5061     0        \n",
        "5048     0        \n",
        "5047     0        \n",
        "5044     0        \n",
        "5063     0        \n",
        "5068     0        \n",
        "5062     0        \n"
       ]
      }
     ],
     "prompt_number": 6
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "The results of SQL queries are RDDs and support all the normal RDD operations.  "
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "# Output duration together with dst_bytes\n",
      "tcp_interactions_out = tcp_interactions.map(lambda p: \"Duration: {}, Dest. bytes: {}\".format(p.duration, p.dst_bytes))\n",
      "for ti_out in tcp_interactions_out.collect():\n",
      "  print ti_out"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "output_type": "stream",
       "stream": "stdout",
       "text": [
        "Duration: 5057, Dest. bytes: 0\n",
        "Duration: 5059, Dest. bytes: 0\n",
        "Duration: 5051, Dest. bytes: 0\n",
        "Duration: 5056, Dest. bytes: 0\n",
        "Duration: 5051, Dest. bytes: 0\n",
        "Duration: 5039, Dest. bytes: 0\n",
        "Duration: 5062, Dest. bytes: 0\n",
        "Duration: 5041, Dest. bytes: 0\n",
        "Duration: 5056, Dest. bytes: 0\n",
        "Duration: 5064, Dest. bytes: 0\n",
        "Duration: 5043, Dest. bytes: 0\n",
        "Duration: 5061, Dest. bytes: 0\n",
        "Duration: 5049, Dest. bytes: 0\n",
        "Duration: 5061, Dest. bytes: 0\n",
        "Duration: 5048, Dest. bytes: 0\n",
        "Duration: 5047, Dest. bytes: 0\n",
        "Duration: 5044, Dest. bytes: 0\n",
        "Duration: 5063, Dest. bytes: 0\n",
        "Duration: 5068, Dest. bytes: 0\n",
        "Duration: 5062, Dest. bytes: 0\n",
        "Duration: 5046, Dest. bytes: 0\n",
        "Duration: 5052, Dest. bytes: 0\n",
        "Duration: 5044, Dest. bytes: 0\n",
        "Duration: 5054, Dest. bytes: 0\n",
        "Duration: 5039, Dest. bytes: 0\n",
        "Duration: 5058, Dest. bytes: 0\n",
        "Duration: 5051, Dest. bytes: 0\n",
        "Duration: 5032, Dest. bytes: 0\n",
        "Duration: 5063, Dest. bytes: 0\n",
        "Duration: 5040, Dest. bytes: 0\n",
        "Duration: 5051, Dest. bytes: 0\n",
        "Duration: 5066, Dest. bytes: 0\n",
        "Duration: 5044, Dest. bytes: 0\n",
        "Duration: 5051, Dest. bytes: 0\n",
        "Duration: 5036, Dest. bytes: 0\n",
        "Duration: 5055, Dest. bytes: 0\n",
        "Duration: 2426, Dest. bytes: 0\n",
        "Duration: 5047, Dest. bytes: 0\n",
        "Duration: 5057, Dest. bytes: 0\n",
        "Duration: 5037, Dest. bytes: 0\n",
        "Duration: 5057, Dest. bytes: 0\n",
        "Duration: 5062, Dest. bytes: 0\n",
        "Duration: 5051, Dest. bytes: 0\n",
        "Duration: 5051, Dest. bytes: 0\n",
        "Duration: 5053, Dest. bytes: 0\n",
        "Duration: 5064, Dest. bytes: 0\n",
        "Duration: 5044, Dest. bytes: 0\n",
        "Duration: 5051, Dest. bytes: 0\n",
        "Duration: 5033, Dest. bytes: 0\n",
        "Duration: 5066, Dest. bytes: 0\n",
        "Duration: 5063, Dest. bytes: 0\n",
        "Duration: 5056, Dest. bytes: 0\n",
        "Duration: 5042, Dest. bytes: 0\n",
        "Duration: 5063, Dest. bytes: 0\n",
        "Duration: 5060, Dest. bytes: 0\n",
        "Duration: 5056, Dest. bytes: 0\n",
        "Duration: 5049, Dest. bytes: 0\n",
        "Duration: 5043, Dest. bytes: 0\n",
        "Duration: 5039, Dest. bytes: 0\n",
        "Duration: 5041, Dest. bytes: 0\n",
        "Duration: 42448, Dest. bytes: 0\n",
        "Duration: 42088, Dest. bytes: 0\n",
        "Duration: 41065, Dest. bytes: 0\n",
        "Duration: 40929, Dest. bytes: 0\n",
        "Duration: 40806, Dest. bytes: 0\n",
        "Duration: 40682, Dest. bytes: 0\n",
        "Duration: 40571, Dest. bytes: 0\n",
        "Duration: 40448, Dest. bytes: 0\n",
        "Duration: 40339, Dest. bytes: 0\n",
        "Duration: 40232, Dest. bytes: 0\n",
        "Duration: 40121, Dest. bytes: 0\n",
        "Duration: 36783, Dest. bytes: 0\n",
        "Duration: 36674, Dest. bytes: 0\n",
        "Duration: 36570, Dest. bytes: 0\n",
        "Duration: 36467, Dest. bytes: 0\n",
        "Duration: 36323, Dest. bytes: 0\n",
        "Duration: 36204, Dest. bytes: 0\n",
        "Duration: 32038, Dest. bytes: 0\n",
        "Duration: 31925, Dest. bytes: 0\n",
        "Duration: 31809, Dest. bytes: 0\n",
        "Duration: 31709, Dest. bytes: 0\n",
        "Duration: 31601, Dest. bytes: 0\n",
        "Duration: 31501, Dest. bytes: 0\n",
        "Duration: 31401, Dest. bytes: 0\n",
        "Duration: 31301, Dest. bytes: 0\n",
        "Duration: 31194, Dest. bytes: 0\n",
        "Duration: 31061, Dest. bytes: 0\n",
        "Duration: 30935, Dest. bytes: 0\n",
        "Duration: 30835, Dest. bytes: 0\n",
        "Duration: 30735, Dest. bytes: 0\n",
        "Duration: 30619, Dest. bytes: 0\n",
        "Duration: 30518, Dest. bytes: 0\n",
        "Duration: 30418, Dest. bytes: 0\n",
        "Duration: 30317, Dest. bytes: 0\n",
        "Duration: 30217, Dest. bytes: 0\n",
        "Duration: 30077, Dest. bytes: 0\n",
        "Duration: 25420, Dest. bytes: 0\n",
        "Duration: 22921, Dest. bytes: 0\n",
        "Duration: 22821, Dest. bytes: 0\n",
        "Duration: 22721, Dest. bytes: 0\n",
        "Duration: 22616, Dest. bytes: 0\n",
        "Duration: 22516, Dest. bytes: 0\n",
        "Duration: 22416, Dest. bytes: 0\n",
        "Duration: 22316, Dest. bytes: 0\n",
        "Duration: 22216, Dest. bytes: 0\n",
        "Duration: 21987, Dest. bytes: 0\n",
        "Duration: 21887, Dest. bytes: 0\n",
        "Duration: 21767, Dest. bytes: 0\n",
        "Duration: 21661, Dest. bytes: 0\n",
        "Duration: 21561, Dest. bytes: 0\n",
        "Duration: 21455, Dest. bytes: 0\n",
        "Duration: 21334, Dest. bytes: 0\n",
        "Duration: 21223, Dest. bytes: 0\n",
        "Duration: 21123, Dest. bytes: 0\n",
        "Duration: 20983, Dest. bytes: 0\n",
        "Duration: 14682, Dest. bytes: 0\n",
        "Duration: 14420, Dest. bytes: 0\n",
        "Duration: 14319, Dest. bytes: 0\n",
        "Duration: 14198, Dest. bytes: 0\n",
        "Duration: 14098, Dest. bytes: 0\n",
        "Duration: 13998, Dest. bytes: 0\n",
        "Duration: 13898, Dest. bytes: 0\n",
        "Duration: 13796, Dest. bytes: 0\n",
        "Duration: 13678, Dest. bytes: 0\n",
        "Duration: 13578, Dest. bytes: 0\n",
        "Duration: 13448, Dest. bytes: 0\n",
        "Duration: 13348, Dest. bytes: 0\n",
        "Duration: 13241, Dest. bytes: 0\n",
        "Duration: 13141, Dest. bytes: 0\n",
        "Duration: 13033, Dest. bytes: 0\n",
        "Duration: 12933, Dest. bytes: 0\n",
        "Duration: 12833, Dest. bytes: 0\n",
        "Duration: 12733, Dest. bytes: 0\n",
        "Duration: 12001, Dest. bytes: 0\n",
        "Duration: 5678, Dest. bytes: 0\n",
        "Duration: 5010, Dest. bytes: 0\n",
        "Duration: 1298, Dest. bytes: 0\n",
        "Duration: 1031, Dest. bytes: 0\n",
        "Duration: 36438, Dest. bytes: 0\n"
       ]
      }
     ],
     "prompt_number": 7
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "We can easily have a look at our data frame schema using `printSchema`.  "
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "interactions_df.printSchema()"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "output_type": "stream",
       "stream": "stdout",
       "text": [
        "root\n",
        " |-- dst_bytes: long (nullable = true)\n",
        " |-- duration: long (nullable = true)\n",
        " |-- flag: string (nullable = true)\n",
        " |-- protocol_type: string (nullable = true)\n",
        " |-- service: string (nullable = true)\n",
        " |-- src_bytes: long (nullable = true)\n",
        "\n"
       ]
      }
     ],
     "prompt_number": 8
    },
    {
     "cell_type": "heading",
     "level": 2,
     "metadata": {},
     "source": [
      "Queries as `DataFrame` operations"
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "Spark `DataFrame` provides a domain-specific language for structured data manipulation. This language includes methods we can concatenate in order to do selection, filtering, grouping, etc. For example, let's say we want to count how many interactions are there for each protocol type. We can proceed as follows.  "
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "from time import time\n",
      "\n",
      "t0 = time()\n",
      "interactions_df.select(\"protocol_type\", \"duration\", \"dst_bytes\").groupBy(\"protocol_type\").count().show()\n",
      "tt = time() - t0\n",
      "\n",
      "print \"Query performed in {} seconds\".format(round(tt,3))"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "output_type": "stream",
       "stream": "stdout",
       "text": [
        "protocol_type count \n",
        "udp           20354 \n",
        "tcp           190065\n",
        "icmp          283602\n",
        "Query performed in 20.568 seconds\n"
       ]
      }
     ],
     "prompt_number": 9
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "Now imagine that we want to count how many interactions last more than 1 second, with no data transfer from destination, grouped by protocol type. We can just add to filter calls to the previous.   "
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "t0 = time()\n",
      "interactions_df.select(\"protocol_type\", \"duration\", \"dst_bytes\").filter(interactions_df.duration>1000).filter(interactions_df.dst_bytes==0).groupBy(\"protocol_type\").count().show()\n",
      "tt = time() - t0\n",
      "\n",
      "print \"Query performed in {} seconds\".format(round(tt,3))"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "output_type": "stream",
       "stream": "stdout",
       "text": [
        "protocol_type count\n",
        "tcp           139  \n",
        "Query performed in 16.641 seconds\n"
       ]
      }
     ],
     "prompt_number": 10
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "We can use this to perform some [exploratory data analysis](http://en.wikipedia.org/wiki/Exploratory_data_analysis). Let's count how many attack and normal interactions we have. First we need to add the label column to our data.    "
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "def get_label_type(label):\n",
      "    if label!=\"normal.\":\n",
      "        return \"attack\"\n",
      "    else:\n",
      "        return \"normal\"\n",
      "    \n",
      "row_labeled_data = csv_data.map(lambda p: Row(\n",
      "    duration=int(p[0]), \n",
      "    protocol_type=p[1],\n",
      "    service=p[2],\n",
      "    flag=p[3],\n",
      "    src_bytes=int(p[4]),\n",
      "    dst_bytes=int(p[5]),\n",
      "    label=get_label_type(p[41])\n",
      "    )\n",
      ")\n",
      "interactions_labeled_df = sqlContext.createDataFrame(row_labeled_data)"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 11
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "This time we don't need to register the schema since we are going to use the OO query interface.  "
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "Let's check the previous actually works by counting attack and normal data in our data frame.  "
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "t0 = time()\n",
      "interactions_labeled_df.select(\"label\").groupBy(\"label\").count().show()\n",
      "tt = time() - t0\n",
      "\n",
      "print \"Query performed in {} seconds\".format(round(tt,3))"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "output_type": "stream",
       "stream": "stdout",
       "text": [
        "label  count \n",
        "attack 396743\n",
        "normal 97278 \n",
        "Query performed in 17.325 seconds\n"
       ]
      }
     ],
     "prompt_number": 12
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "Now we want to count them by label and protocol type, in order to see how important the protocol type is to detect when an interaction is or not an attack.  "
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "t0 = time()\n",
      "interactions_labeled_df.select(\"label\", \"protocol_type\").groupBy(\"label\", \"protocol_type\").count().show()\n",
      "tt = time() - t0\n",
      "\n",
      "print \"Query performed in {} seconds\".format(round(tt,3))"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "output_type": "stream",
       "stream": "stdout",
       "text": [
        "label  protocol_type count \n",
        "attack udp           1177  \n",
        "attack tcp           113252\n",
        "attack icmp          282314\n",
        "normal udp           19177 \n",
        "normal tcp           76813 \n",
        "normal icmp          1288  \n",
        "Query performed in 17.253 seconds\n"
       ]
      }
     ],
     "prompt_number": 13
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "At first sight it seems that *udp* interactions are in lower proportion between network attacks versus other protocol types.  "
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "And we can do much more sofisticated groupings. For example, add to the previous a \"split\" based on data transfer from target.   "
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "t0 = time()\n",
      "interactions_labeled_df.select(\"label\", \"protocol_type\", \"dst_bytes\").groupBy(\"label\", \"protocol_type\", interactions_labeled_df.dst_bytes==0).count().show()\n",
      "tt = time() - t0\n",
      "\n",
      "print \"Query performed in {} seconds\".format(round(tt,3))"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "output_type": "stream",
       "stream": "stdout",
       "text": [
        "label  protocol_type (dst_bytes = 0) count \n",
        "normal icmp          true            1288  \n",
        "attack udp           true            1166  \n",
        "attack udp           false           11    \n",
        "normal udp           true            3594  \n",
        "normal udp           false           15583 \n",
        "attack tcp           true            110583\n",
        "attack tcp           false           2669  \n",
        "normal tcp           true            9313  \n",
        "normal tcp           false           67500 \n",
        "attack icmp          true            282314\n",
        "Query performed in 17.284 seconds\n"
       ]
      }
     ],
     "prompt_number": 14
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "We see how relevant is this new split to determine if a network interaction is an attack.  "
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "We will stop here, but we can see how powerfull this type of queries are in order to explore our data. Actually we can replicate all the splits we saw in previous notebooks, when introducing classification trees, just by selecting, groping, and filtering our dataframe. For a more detailed (but less real-world) list of Spark's `DataFrame` operations and data sources, have a look at the oficial documentation [here](https://spark.apache.org/docs/latest/sql-programming-guide.html#dataframe-operations).    "
     ]
    }
   ],
   "metadata": {}
  }
 ]
}